基于TairTS实现秒级监控

随着不断增长的监控指标与数据流量,监控系统变得越来越复杂,同时对监控系统的时效性提出了更高的要求。本文介绍基于TairTS轻松搭建高并发场景的秒级监控系统。

TairTS简介

TairTSTair自研的Module,依托Tair(企业版)提供实时且高并发的查询、写入性能,支持对历史时序数据的更新或累加,支持时间线级别的TTL设定,保证每条时间线都可以按时间窗口自动滚动,同时采用高效的Gorilla压缩算法与特定存储,极大降低存储成本。更多信息,请参见TS

秒级监控介绍

图 1. 秒级监控架构图示例秒级监控架构图

本文以上图为例介绍秒级监控架构。Console发布秒级监控配置,对应APP接收配置后通过MQTT协议写入到Collector中,Collector处理数据后写入到Tair数据库中。

  • 高并发查询场景

    在高并发查询场景中,TairTS不仅可以保证查询的性能,还支持降采样、属性过滤、分批查询、多种数值函数等条件下的聚合操作,满足不同业务进行多维度筛选与查看,同时支持将批量查询与聚合计算集成到单条命令中,减少网络交互,实现毫秒级响应,帮助您及时定位问题。

  • 高并发写入场景

    在高并发写入场景中,随着APP规模的增大,单点的Collector将会成为瓶颈。为此,TairTS支持对历史时序数据的原地更新或累加,保障多Collector并发写入正确性,同时节省内存空间。并发写入的代码示例如下:

    import com.aliyun.tair.tairts.TairTs;
    import com.aliyun.tair.tairts.params.ExtsAggregationParams;
    import com.aliyun.tair.tairts.params.ExtsAttributesParams;
    import com.aliyun.tair.tairts.results.ExtsSkeyResult;
    import redis.clients.jedis.Jedis;
    
    public class test {
    
        protected static final String HOST = "127.0.0.1";
        protected static final int PORT = 6379;
    
    
        public static void main(String[] args) {
            try {
                Jedis jedis = new Jedis(HOST, PORT, 2000 * 100);
                if (!"PONG".equals(jedis.ping())) {
                    System.exit(-1);
                }
                TairTs tairTs = new TairTs(jedis);
                //Cluster模式时如下:
                //TairTsCluster tairTsCluster = new TairTsCluster(jedisCluster);
    
                String pkey = "cpu_load";
                String skey1 = "app1";
                long startTs = (System.currentTimeMillis() - 100000) / 1000 * 1000;
                long endTs = System.currentTimeMillis() / 1000 * 1000;
                String startTsStr = String.valueOf(startTs);
                String endTsStr = String.valueOf(endTs);
    
                tairTs.extsdel(pkey, skey1);
                long num = 5;
    
                //Collector A 并发更新。
                for (int i = 0; i < num; i++) {
                    double val = i;
                    long ts = startTs + i*1000;
                    String tsStr = String.valueOf(ts);
                    ExtsAttributesParams params = new ExtsAttributesParams();
                    params.dataEt(1000000000);
                    String addRet = tairTs.extsrawincr(pkey, skey1, tsStr, val, params);
                }
    
                ExtsAggregationParams paramsAgg = new ExtsAggregationParams();
                paramsAgg.maxCountSize(10);
                paramsAgg.aggAvg(1000);
    
                System.out.println("Collector A并发更新后结果:");
                ExtsSkeyResult rangeByteRet = tairTs.extsrange(pkey, skey1, startTsStr, endTsStr, paramsAgg);
                for (int i = 0; i < num; i++) {
                    System.out.println("    ts: " + rangeByteRet.getDataPoints().get(i).getTs() + ", value: " + rangeByteRet.getDataPoints().get(i).getDoubleValue());
                }
    
                //Collector B并发更新。
                for (int i = 0; i < num; i++) {
                    double val = i;
                    long ts = startTs + i*1000;
                    String tsStr = String.valueOf(ts);
                    ExtsAttributesParams params = new ExtsAttributesParams();
                    params.dataEt(1000000000);
                    String addRet = tairTs.extsrawincr(pkey, skey1, tsStr, val, params);
                }
    
    
                System.out.println("Collector B 并发更新后结果:");
                rangeByteRet = tairTs.extsrange(pkey, skey1, startTsStr, endTsStr, paramsAgg);
                for (int i = 0; i < num; i++) {
                    System.out.println("    ts: " + rangeByteRet.getDataPoints().get(i).getTs() + ", value: " + rangeByteRet.getDataPoints().get(i).getDoubleValue());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    执行结果:

    Collector A并发更新后结果:
        ts: 1597049266000, value: 0.0
        ts: 1597049267000, value: 1.0
        ts: 1597049268000, value: 2.0
        ts: 1597049269000, value: 3.0
        ts: 1597049270000, value: 4.0
    Collector B并发更新后结果:
        ts: 1597049266000, value: 0.0
        ts: 1597049267000, value: 2.0
        ts: 1597049268000, value: 4.0
        ts: 1597049269000, value: 6.0
        ts: 1597049270000, value: 8.0